#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

library(testthat)

context("MLlib regression algorithms, except for tree-based algorithms")

# Tests for MLlib regression algorithms in SparkR
sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE)

test_that("formula of spark.glm", {
  training <- suppressWarnings(createDataFrame(iris))
  # directly calling the spark API
  # dot minus and intercept vs native glm
  model <- spark.glm(training, Sepal_Width ~ . - Species + 0)
  vals <- collect(select(predict(model, training), "prediction"))
  rVals <- predict(glm(Sepal.Width ~ . - Species + 0, data = iris), iris)
  expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)

  # feature interaction vs native glm
  model <- spark.glm(training, Sepal_Width ~ Species:Sepal_Length)
  vals <- collect(select(predict(model, training), "prediction"))
  rVals <- predict(glm(Sepal.Width ~ Species:Sepal.Length, data = iris), iris)
  expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)

  # glm should work with long formula
  training <- suppressWarnings(createDataFrame(iris))
  training$LongLongLongLongLongName <- training$Sepal_Width
  training$VeryLongLongLongLonLongName <- training$Sepal_Length
  training$AnotherLongLongLongLongName <- training$Species
  model <- spark.glm(training, LongLongLongLongLongName ~ VeryLongLongLongLonLongName +
    AnotherLongLongLongLongName)
  vals <- collect(select(predict(model, training), "prediction"))
  rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris)
  expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
})

test_that("spark.glm and predict", {
  training <- suppressWarnings(createDataFrame(iris))
  # gaussian family
  model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species)
  prediction <- predict(model, training)
  expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
  vals <- collect(select(prediction, "prediction"))
  rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris)
  expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)

  # poisson family
  model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species,
                     family = poisson(link = identity))
  prediction <- predict(model, training)
  expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
  vals <- collect(select(prediction, "prediction"))
  rVals <- suppressWarnings(predict(glm(Sepal.Width ~ Sepal.Length + Species,
                                        data = iris, family = poisson(link = identity)), iris))
  expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)

  # Gamma family
  x <- runif(100, -1, 1)
  y <- rgamma(100, rate = 10 / exp(0.5 + 1.2 * x), shape = 10)
  df <- as.DataFrame(as.data.frame(list(x = x, y = y)))
  model <- glm(y ~ x, family = Gamma, df)
  out <- capture.output(print(summary(model)))
  expect_true(any(grepl("Dispersion parameter for gamma family", out)))

  # tweedie family
  model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species,
                     family = "tweedie", var.power = 1.2, link.power = 0.0)
  prediction <- predict(model, training)
  expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
  vals <- collect(select(prediction, "prediction"))

  # manual calculation of the R predicted values to avoid dependence on statmod
  #' library(statmod)
  #' rModel <- glm(Sepal.Width ~ Sepal.Length + Species, data = iris,
  #'             family = tweedie(var.power = 1.2, link.power = 0.0))
  #' print(coef(rModel))

  rCoef <- c(0.6455409, 0.1169143, -0.3224752, -0.3282174)
  rVals <- exp(as.numeric(model.matrix(Sepal.Width ~ Sepal.Length + Species,
                                       data = iris) %*% rCoef))
  expect_true(all(abs(rVals - vals) < 1e-5), rVals - vals)

  # Test stats::predict is working
  x <- rnorm(15)
  y <- x + rnorm(15)
  expect_equal(length(predict(lm(y ~ x))), 15)
})

test_that("spark.glm summary", {
  # prepare dataset
  Sepal.Length <- c(2.0, 1.5, 1.8, 3.4, 5.1, 1.8, 1.0, 2.3)
  Sepal.Width <- c(2.1, 2.3, 5.4, 4.7, 3.1, 2.1, 3.1, 5.5)
  Petal.Length <- c(1.8, 2.1, 7.1, 2.5, 3.7, 6.3, 2.2, 7.2)
  Species <- c("setosa", "versicolor", "versicolor", "versicolor", "virginica", "virginica",
               "versicolor", "virginica")
  dataset <- data.frame(Sepal.Length, Sepal.Width, Petal.Length, Species, stringsAsFactors = TRUE)

  # gaussian family
  training <- suppressWarnings(createDataFrame(dataset))
  stats <- summary(spark.glm(training, Sepal_Width ~ Sepal_Length + Species))
  rStats <- summary(glm(Sepal.Width ~ Sepal.Length + Species, data = dataset))

  # test summary coefficients return matrix type
  expect_true(any(class(stats$coefficients) == "matrix"))
  expect_true(class(stats$coefficients[, 1]) == "numeric")

  coefs <- stats$coefficients
  rCoefs <- rStats$coefficients
  expect_true(all(abs(rCoefs - coefs) < 1e-4))
  expect_true(all(
    rownames(stats$coefficients) ==
    c("(Intercept)", "Sepal_Length", "Species_versicolor", "Species_virginica")))
  expect_equal(stats$dispersion, rStats$dispersion)
  expect_equal(stats$null.deviance, rStats$null.deviance)
  expect_equal(stats$deviance, rStats$deviance)
  expect_equal(stats$df.null, rStats$df.null)
  expect_equal(stats$df.residual, rStats$df.residual)
  expect_equal(stats$aic, rStats$aic)

  out <- capture.output(print(stats))
  expect_match(out[2], "Deviance Residuals:")
  expect_true(any(grepl("AIC: 35.84", out)))

  # binomial family
  df <- suppressWarnings(createDataFrame(dataset))
  training <- df[df$Species %in% c("versicolor", "virginica"), ]
  stats <- summary(spark.glm(training, Species ~ Sepal_Length + Sepal_Width,
                             family = binomial(link = "logit")))

  rTraining <- dataset[dataset$Species %in% c("versicolor", "virginica"), ]
  rStats <- summary(glm(Species ~ Sepal.Length + Sepal.Width, data = rTraining,
                        family = binomial(link = "logit")))

  coefs <- stats$coefficients
  rCoefs <- rStats$coefficients
  expect_true(all(abs(rCoefs - coefs) < 1e-4))
  expect_true(all(
    rownames(stats$coefficients) ==
    c("(Intercept)", "Sepal_Length", "Sepal_Width")))
  expect_equal(stats$dispersion, rStats$dispersion)
  expect_equal(stats$null.deviance, rStats$null.deviance)
  expect_equal(stats$deviance, rStats$deviance)
  expect_equal(stats$df.null, rStats$df.null)
  expect_equal(stats$df.residual, rStats$df.residual)
  expect_equal(stats$aic, rStats$aic)

  # Test spark.glm works with weighted dataset
  a1 <- c(0, 1, 2, 3)
  a2 <- c(5, 2, 1, 3)
  w <- c(1, 2, 3, 4)
  b <- c(1, 0, 1, 0)
  data <- as.data.frame(cbind(a1, a2, w, b))
  df <- createDataFrame(data)

  stats <- summary(spark.glm(df, b ~ a1 + a2, family = "binomial", weightCol = "w"))
  rStats <- summary(glm(b ~ a1 + a2, family = "binomial", data = data, weights = w))

  coefs <- stats$coefficients
  rCoefs <- rStats$coefficients
  expect_true(all(abs(rCoefs - coefs) < 1e-3))
  expect_true(all(rownames(stats$coefficients) == c("(Intercept)", "a1", "a2")))
  expect_equal(stats$dispersion, rStats$dispersion)
  expect_equal(stats$null.deviance, rStats$null.deviance)
  expect_equal(stats$deviance, rStats$deviance)
  expect_equal(stats$df.null, rStats$df.null)
  expect_equal(stats$df.residual, rStats$df.residual)
  expect_equal(stats$aic, rStats$aic)

  # Test spark.glm works with offset
  training <- suppressWarnings(createDataFrame(dataset))
  stats <- summary(spark.glm(training, Sepal_Width ~ Sepal_Length + Species,
                             family = poisson(), offsetCol = "Petal_Length"))
  rStats <- suppressWarnings(summary(glm(Sepal.Width ~ Sepal.Length + Species,
                        data = dataset, family = poisson(), offset = dataset$Petal.Length)))
  expect_true(all(abs(rStats$coefficients - stats$coefficients) < 1e-3))

  # Test summary works on base GLM models
  baseModel <- stats::glm(Sepal.Width ~ Sepal.Length + Species, data = dataset)
  baseSummary <- summary(baseModel)
  expect_true(abs(baseSummary$deviance - 11.84013) < 1e-4)

  # Test spark.glm works with regularization parameter
  data <- as.data.frame(cbind(a1, a2, b))
  df <- suppressWarnings(createDataFrame(data))
  regStats <- summary(spark.glm(df, b ~ a1 + a2, regParam = 1.0))
  expect_equal(regStats$aic, 13.32836, tolerance = 1e-4) # 13.32836 is from summary() result

  # Test spark.glm works on collinear data
  A <- matrix(c(1, 2, 3, 4, 2, 4, 6, 8), 4, 2)
  b <- c(1, 2, 3, 4)
  data <- as.data.frame(cbind(A, b))
  df <- createDataFrame(data)
  stats <- summary(spark.glm(df, b ~ . - 1))
  coefs <- stats$coefficients
  expect_true(all(abs(c(0.5, 0.25) - coefs) < 1e-4))
})

test_that("spark.glm save/load", {
  training <- suppressWarnings(createDataFrame(iris))
  m <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species)
  s <- summary(m)

  modelPath <- tempfile(pattern = "spark-glm", fileext = ".tmp")
  write.ml(m, modelPath)
  expect_error(write.ml(m, modelPath))
  write.ml(m, modelPath, overwrite = TRUE)
  m2 <- read.ml(modelPath)
  s2 <- summary(m2)

  expect_equal(s$coefficients, s2$coefficients)
  expect_equal(rownames(s$coefficients), rownames(s2$coefficients))
  expect_equal(s$dispersion, s2$dispersion)
  expect_equal(s$null.deviance, s2$null.deviance)
  expect_equal(s$deviance, s2$deviance)
  expect_equal(s$df.null, s2$df.null)
  expect_equal(s$df.residual, s2$df.residual)
  expect_equal(s$aic, s2$aic)
  expect_equal(s$iter, s2$iter)
  expect_true(!s$is.loaded)
  expect_true(s2$is.loaded)

  unlink(modelPath)
})

test_that("formula of glm", {
  training <- suppressWarnings(createDataFrame(iris))
  # dot minus and intercept vs native glm
  model <- glm(Sepal_Width ~ . - Species + 0, data = training)
  vals <- collect(select(predict(model, training), "prediction"))
  rVals <- predict(glm(Sepal.Width ~ . - Species + 0, data = iris), iris)
  expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)

  # feature interaction vs native glm
  model <- glm(Sepal_Width ~ Species:Sepal_Length, data = training)
  vals <- collect(select(predict(model, training), "prediction"))
  rVals <- predict(glm(Sepal.Width ~ Species:Sepal.Length, data = iris), iris)
  expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)

  # glm should work with long formula
  training <- suppressWarnings(createDataFrame(iris))
  training$LongLongLongLongLongName <- training$Sepal_Width
  training$VeryLongLongLongLonLongName <- training$Sepal_Length
  training$AnotherLongLongLongLongName <- training$Species
  model <- glm(LongLongLongLongLongName ~ VeryLongLongLongLonLongName + AnotherLongLongLongLongName,
               data = training)
  vals <- collect(select(predict(model, training), "prediction"))
  rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris)
  expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
})

test_that("glm and predict", {
  training <- suppressWarnings(createDataFrame(iris))
  # gaussian family
  model <- glm(Sepal_Width ~ Sepal_Length + Species, data = training)
  prediction <- predict(model, training)
  expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
  vals <- collect(select(prediction, "prediction"))
  rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris)
  expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)

  # poisson family
  model <- glm(Sepal_Width ~ Sepal_Length + Species, data = training,
               family = poisson(link = identity))
  prediction <- predict(model, training)
  expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
  vals <- collect(select(prediction, "prediction"))
  rVals <- suppressWarnings(predict(glm(Sepal.Width ~ Sepal.Length + Species,
                                        data = iris, family = poisson(link = identity)), iris))
  expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)

  # tweedie family
  model <- glm(Sepal_Width ~ Sepal_Length + Species, data = training,
               family = "tweedie", var.power = 1.2, link.power = 0.0)
  prediction <- predict(model, training)
  expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
  vals <- collect(select(prediction, "prediction"))

  # manual calculation of the R predicted values to avoid dependence on statmod
  #' library(statmod)
  #' rModel <- glm(Sepal.Width ~ Sepal.Length + Species, data = iris,
  #'             family = tweedie(var.power = 1.2, link.power = 0.0))
  #' print(coef(rModel))

  rCoef <- c(0.6455409, 0.1169143, -0.3224752, -0.3282174)
  rVals <- exp(as.numeric(model.matrix(Sepal.Width ~ Sepal.Length + Species,
                                   data = iris) %*% rCoef))
  expect_true(all(abs(rVals - vals) < 1e-5), rVals - vals)

  # Test stats::predict is working
  x <- rnorm(15)
  y <- x + rnorm(15)
  expect_equal(length(predict(lm(y ~ x))), 15)
})

test_that("glm summary", {
  # prepare dataset
  Sepal.Length <- c(2.0, 1.5, 1.8, 3.4, 5.1, 1.8, 1.0, 2.3)
  Sepal.Width <- c(2.1, 2.3, 5.4, 4.7, 3.1, 2.1, 3.1, 5.5)
  Petal.Length <- c(1.8, 2.1, 7.1, 2.5, 3.7, 6.3, 2.2, 7.2)
  Species <- c("setosa", "versicolor", "versicolor", "versicolor", "virginica", "virginica",
               "versicolor", "virginica")
  dataset <- data.frame(Sepal.Length, Sepal.Width, Petal.Length, Species, stringsAsFactors = TRUE)

  # gaussian family
  training <- suppressWarnings(createDataFrame(dataset))
  stats <- summary(glm(Sepal_Width ~ Sepal_Length + Species, data = training))

  rStats <- summary(glm(Sepal.Width ~ Sepal.Length + Species, data = dataset))

  coefs <- stats$coefficients
  rCoefs <- rStats$coefficients
  expect_true(all(abs(rCoefs - coefs) < 1e-4))
  expect_true(all(
    rownames(stats$coefficients) ==
    c("(Intercept)", "Sepal_Length", "Species_versicolor", "Species_virginica")))
  expect_equal(stats$dispersion, rStats$dispersion)
  expect_equal(stats$null.deviance, rStats$null.deviance)
  expect_equal(stats$deviance, rStats$deviance)
  expect_equal(stats$df.null, rStats$df.null)
  expect_equal(stats$df.residual, rStats$df.residual)
  expect_equal(stats$aic, rStats$aic)

  # binomial family
  df <- suppressWarnings(createDataFrame(dataset))
  training <- df[df$Species %in% c("versicolor", "virginica"), ]
  stats <- summary(glm(Species ~ Sepal_Length + Sepal_Width, data = training,
                       family = binomial(link = "logit")))

  rTraining <- dataset[dataset$Species %in% c("versicolor", "virginica"), ]
  rStats <- summary(glm(Species ~ Sepal.Length + Sepal.Width, data = rTraining,
                        family = binomial(link = "logit")))

  coefs <- stats$coefficients
  rCoefs <- rStats$coefficients
  expect_true(all(abs(rCoefs - coefs) < 1e-4))
  expect_true(all(
    rownames(stats$coefficients) ==
    c("(Intercept)", "Sepal_Length", "Sepal_Width")))
  expect_equal(stats$dispersion, rStats$dispersion)
  expect_equal(stats$null.deviance, rStats$null.deviance)
  expect_equal(stats$deviance, rStats$deviance)
  expect_equal(stats$df.null, rStats$df.null)
  expect_equal(stats$df.residual, rStats$df.residual)
  expect_equal(stats$aic, rStats$aic)

  # Test summary works on base GLM models
  baseModel <- stats::glm(Sepal.Width ~ Sepal.Length + Species, data = iris)
  baseSummary <- summary(baseModel)
  expect_true(abs(baseSummary$deviance - 12.19313) < 1e-4)
})

test_that("glm save/load", {
  training <- suppressWarnings(createDataFrame(iris))
  m <- glm(Sepal_Width ~ Sepal_Length + Species, data = training)
  s <- summary(m)

  modelPath <- tempfile(pattern = "glm", fileext = ".tmp")
  write.ml(m, modelPath)
  expect_error(write.ml(m, modelPath))
  write.ml(m, modelPath, overwrite = TRUE)
  m2 <- read.ml(modelPath)
  s2 <- summary(m2)

  expect_equal(s$coefficients, s2$coefficients)
  expect_equal(rownames(s$coefficients), rownames(s2$coefficients))
  expect_equal(s$dispersion, s2$dispersion)
  expect_equal(s$null.deviance, s2$null.deviance)
  expect_equal(s$deviance, s2$deviance)
  expect_equal(s$df.null, s2$df.null)
  expect_equal(s$df.residual, s2$df.residual)
  expect_equal(s$aic, s2$aic)
  expect_equal(s$iter, s2$iter)
  expect_true(!s$is.loaded)
  expect_true(s2$is.loaded)

  unlink(modelPath)
})

test_that("spark.glm and glm with string encoding", {
  t <- as.data.frame(Titanic, stringsAsFactors = FALSE)
  df <- createDataFrame(t)

  # base R
  rm <- stats::glm(Freq ~ Sex + Age, family = "gaussian", data = t)
  # spark.glm with default stringIndexerOrderType = "frequencyDesc"
  sm0 <- spark.glm(df, Freq ~ Sex + Age, family = "gaussian")
  # spark.glm with stringIndexerOrderType = "alphabetDesc"
  sm1 <- spark.glm(df, Freq ~ Sex + Age, family = "gaussian",
                   stringIndexerOrderType = "alphabetDesc")
  # glm with stringIndexerOrderType = "alphabetDesc"
  sm2 <- glm(Freq ~ Sex + Age, family = "gaussian", data = df,
                stringIndexerOrderType = "alphabetDesc")

  rStats <- summary(rm)
  rCoefs <- rStats$coefficients
  sStats <- lapply(list(sm0, sm1, sm2), summary)
  # order by coefficient size since column rendering may be different
  o <- order(rCoefs[, 1])

  # default encoding does not produce same results as R
  expect_false(all(abs(rCoefs[o, ] - sStats[[1]]$coefficients[o, ]) < 1e-4))

  # all estimates should be the same as R with stringIndexerOrderType = "alphabetDesc"
  test <- lapply(sStats[2:3], function(stats) {
    expect_true(all(abs(rCoefs[o, ] - stats$coefficients[o, ]) < 1e-4))
    expect_equal(stats$dispersion, rStats$dispersion)
    expect_equal(stats$null.deviance, rStats$null.deviance)
    expect_equal(stats$deviance, rStats$deviance)
    expect_equal(stats$df.null, rStats$df.null)
    expect_equal(stats$df.residual, rStats$df.residual)
    expect_equal(stats$aic, rStats$aic)
  })

  # fitted values should be equal regardless of string encoding
  rVals <- predict(rm, t)
  test <- lapply(list(sm0, sm1, sm2), function(sm) {
    vals <- collect(select(predict(sm, df), "prediction"))
    expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
  })
})

test_that("spark.isoreg", {
  label <- c(7.0, 5.0, 3.0, 5.0, 1.0)
  feature <- c(0.0, 1.0, 2.0, 3.0, 4.0)
  weight <- c(1.0, 1.0, 1.0, 1.0, 1.0)
  data <- as.data.frame(cbind(label, feature, weight))
  df <- createDataFrame(data)

  model <- spark.isoreg(df, label ~ feature, isotonic = FALSE,
                        weightCol = "weight")
  # only allow one variable on the right hand side of the formula
  expect_error(model2 <- spark.isoreg(df, ~., isotonic = FALSE))
  result <- summary(model)
  expect_equal(result$predictions, list(7, 5, 4, 4, 1))

  # Test model prediction
  predict_data <- list(list(-2.0), list(-1.0), list(0.5),
                       list(0.75), list(1.0), list(2.0), list(9.0))
  predict_df <- createDataFrame(predict_data, c("feature"))
  predict_result <- collect(select(predict(model, predict_df), "prediction"))
  expect_equal(predict_result$prediction, c(7.0, 7.0, 6.0, 5.5, 5.0, 4.0, 1.0))

  # Test model save/load
  if (windows_with_hadoop()) {
    modelPath <- tempfile(pattern = "spark-isoreg", fileext = ".tmp")
    write.ml(model, modelPath)
    expect_error(write.ml(model, modelPath))
    write.ml(model, modelPath, overwrite = TRUE)
    model2 <- read.ml(modelPath)
    expect_equal(result, summary(model2))

    unlink(modelPath)
  }
})

test_that("spark.survreg", {
  # R code to reproduce the result.
  #
  #' rData <- list(time = c(4, 3, 1, 1, 2, 2, 3), status = c(1, 1, 1, 0, 1, 1, 0),
  #'               x = c(0, 2, 1, 1, 1, 0, 0), sex = c(0, 0, 0, 0, 1, 1, 1))
  #' library(survival)
  #' model <- survreg(Surv(time, status) ~ x + sex, rData)
  #' summary(model)
  #' predict(model, data)
  #
  # -- output of 'summary(model)'
  #
  #              Value Std. Error     z        p
  # (Intercept)  1.315      0.270  4.88 1.07e-06
  # x           -0.190      0.173 -1.10 2.72e-01
  # sex         -0.253      0.329 -0.77 4.42e-01
  # Log(scale)  -1.160      0.396 -2.93 3.41e-03
  #
  # -- output of 'predict(model, data)'
  #
  #        1        2        3        4        5        6        7
  # 3.724591 2.545368 3.079035 3.079035 2.390146 2.891269 2.891269
  #
  data <- list(list(4, 1, 0, 0), list(3, 1, 2, 0), list(1, 1, 1, 0),
          list(1, 0, 1, 0), list(2, 1, 1, 1), list(2, 1, 0, 1), list(3, 0, 0, 1))
  df <- createDataFrame(data, c("time", "status", "x", "sex"))
  model <- spark.survreg(df, Surv(time, status) ~ x + sex)
  stats <- summary(model)
  coefs <- as.vector(stats$coefficients[, 1])
  rCoefs <- c(1.3149571, -0.1903409, -0.2532618, -1.1599800)
  expect_equal(coefs, rCoefs, tolerance = 1e-4)
  expect_true(all(
    rownames(stats$coefficients) ==
    c("(Intercept)", "x", "sex", "Log(scale)")))
  p <- collect(select(predict(model, df), "prediction"))
  expect_equal(p$prediction, c(3.724591, 2.545368, 3.079035, 3.079035,
               2.390146, 2.891269, 2.891269), tolerance = 1e-4)

  # Test model save/load
  if (windows_with_hadoop()) {
    modelPath <- tempfile(pattern = "spark-survreg", fileext = ".tmp")
    write.ml(model, modelPath)
    expect_error(write.ml(model, modelPath))
    write.ml(model, modelPath, overwrite = TRUE)
    model2 <- read.ml(modelPath)
    stats2 <- summary(model2)
    coefs2 <- as.vector(stats2$coefficients[, 1])
    expect_equal(coefs, coefs2)
    expect_equal(rownames(stats$coefficients), rownames(stats2$coefficients))

    unlink(modelPath)
  }

  # Test survival::survreg
  if (requireNamespace("survival", quietly = TRUE)) {
    rData <- list(time = c(4, 3, 1, 1, 2, 2, 3), status = c(1, 1, 1, 0, 1, 1, 0),
                 x = c(0, 2, 1, 1, 1, 0, 0), sex = c(0, 0, 0, 0, 1, 1, 1))
    expect_error(
      model <- survival::survreg(formula = survival::Surv(time, status) ~ x + sex, data = rData),
                                 NA)
    expect_equal(predict(model, rData)[[1]], 3.724591, tolerance = 1e-4)

    # Test stringIndexerOrderType
    rData <- as.data.frame(rData)
    rData$sex2 <- c("female", "male")[rData$sex + 1]
    df <- createDataFrame(rData)
    expect_error(
      rModel <- survival::survreg(survival::Surv(time, status) ~ x + sex2, rData), NA)
    rCoefs <- as.numeric(summary(rModel)$table[, 1])
    model <- spark.survreg(df, Surv(time, status) ~ x + sex2)
    coefs <- as.vector(summary(model)$coefficients[, 1])
    o <- order(rCoefs)
    # stringIndexerOrderType = "frequencyDesc" produces different estimates from R
    expect_false(all(abs(rCoefs[o] - coefs[o]) < 1e-4))

    # stringIndexerOrderType = "alphabetDesc" produces the same estimates as R
    model <- spark.survreg(df, Surv(time, status) ~ x + sex2,
                           stringIndexerOrderType = "alphabetDesc")
    coefs <- as.vector(summary(model)$coefficients[, 1])
    expect_true(all(abs(rCoefs[o] - coefs[o]) < 1e-4))
  }

  test_that("spark.lm", {
    df <- suppressWarnings(createDataFrame(iris))

    model <- spark.lm(
      df,  Sepal_Width ~ .,
      regParam = 0.01, maxIter = 10
    )

    prediction1 <- predict(model, df)
    expect_is(prediction1, "SparkDataFrame")

    # Test model save/load
    if (windows_with_hadoop()) {
      modelPath <- tempfile(pattern = "spark-lm", fileext = ".tmp")
      write.ml(model, modelPath)
      model2 <- read.ml(modelPath)

      expect_is(model2, "LinearRegressionModel")
      expect_equal(summary(model), summary(model2))

      prediction2 <- predict(model2, df)
      expect_equal(
        collect(prediction1),
        collect(prediction2)
      )
      unlink(modelPath)
    }
  })
})


test_that("spark.fmRegressor", {
  df <- suppressWarnings(createDataFrame(iris))

  model <- spark.fmRegressor(
    df,  Sepal_Width ~ .,
    regParam = 0.01, maxIter = 10, fitLinear = TRUE
  )

  prediction1 <- predict(model, df)
  expect_is(prediction1, "SparkDataFrame")

  # Test model save/load
  if (windows_with_hadoop()) {
    modelPath <- tempfile(pattern = "spark-fmregressor", fileext = ".tmp")
    write.ml(model, modelPath)
    model2 <- read.ml(modelPath)

    expect_is(model2, "FMRegressionModel")
    expect_equal(summary(model), summary(model2))

    prediction2 <- predict(model2, df)
    expect_equal(
      collect(prediction1),
      collect(prediction2)
    )
    unlink(modelPath)
  }
})

sparkR.session.stop()
